Quick Start#

This quick guide shows how you can develop a scaleable forecasting system using ForecastFlowML.

Scenario#

  • Dataset constitutes 3 regions that you want to develop an individual model for.

  • Each of the store data is small enough to fit into the single machine memory but large enough to cause memory issue for all 10 stores.

What Will Do#

  • Build independent models for each of the 10 stores.

  • Parallelize training/inference steps.

  • Want to use LightGBM as machine learning algorithm.

  • Develop direct multi-step forecasting using LightGBM.

  • Perform backtesting.

Import packages#

from forecastflowml.meta_model import ForecastFlowML
from forecastflowml.preprocessing import FeatureExtractor
from forecastflowml.data.loader import load_walmart_m5
from lightgbm import LGBMRegressor
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "notebook"
pd.set_option('display.max_columns', 100)

Initialize Spark#

spark = (
    SparkSession.builder.master("local[4]")
    .config("spark.driver.memory", "8g")
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

Sample Dataset#

df = load_walmart_m5(spark).localCheckpoint()
df.limit(10).toPandas().head(5)
id item_id dept_id cat_id store_id state_id sales date christmas
0 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 2.0 2011-01-29 0
1 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 5.0 2011-01-30 0
2 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 3.0 2011-01-31 0
3 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 0.0 2011-02-01 0
4 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 0.0 2011-02-02 0

Feature Engineering#

feature_extractor = FeatureExtractor(
    id_col="id",
    date_col="date",
    target_col="sales",
    lag_window_features={
        "lag": [7 * (i + 1) for i in range(8)],
        "mean": [
            [window, lag] for lag in [7, 14, 21, 28] for window in [7, 14, 30]
        ],
    },
    date_features=[
        "day_of_month",
        "day_of_week",
        "week_of_year",
        "quarter",
        "month",
        "year",
    ],
    count_consecutive_values={
    "value": 0, 
    "lags": [7, 14, 21, 28],
    },
    history_length=True,
)
df_features = feature_extractor.transform(df).localCheckpoint()
df_features.limit(10).toPandas().head(5)
date id cat_id item_id lag_42 count_consecutive_value_lag_21 lag_7 window_30_lag_14_mean window_14_lag_14_mean window_30_lag_21_mean count_consecutive_value_lag_28 lag_49 lag_56 window_14_lag_28_mean dept_id window_30_lag_7_mean count_consecutive_value_lag_14 window_7_lag_28_mean window_30_lag_28_mean window_7_lag_21_mean window_14_lag_7_mean sales lag_21 lag_28 lag_35 window_7_lag_14_mean window_14_lag_21_mean window_7_lag_7_mean count_consecutive_value_lag_7 store_id christmas state_id lag_14 history_length day_of_month day_of_week week_of_year quarter month year
0 2011-01-31 FOODS_1_011_WI_2_evaluation FOODS FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN FOODS_1 NaN NaN NaN NaN NaN NaN 2.0 NaN NaN NaN NaN NaN NaN NaN WI_2 0 WI NaN 1 31 2 5 1 1 2011
1 2011-02-01 FOODS_1_011_WI_2_evaluation FOODS FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN FOODS_1 NaN NaN NaN NaN NaN NaN 0.0 NaN NaN NaN NaN NaN NaN NaN WI_2 0 WI NaN 2 1 3 5 1 2 2011
2 2011-02-02 FOODS_1_011_WI_2_evaluation FOODS FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN FOODS_1 NaN NaN NaN NaN NaN NaN 0.0 NaN NaN NaN NaN NaN NaN NaN WI_2 0 WI NaN 3 2 4 5 1 2 2011
3 2011-02-03 FOODS_1_011_WI_2_evaluation FOODS FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN FOODS_1 NaN NaN NaN NaN NaN NaN 0.0 NaN NaN NaN NaN NaN NaN NaN WI_2 0 WI NaN 4 3 5 5 1 2 2011
4 2011-02-04 FOODS_1_011_WI_2_evaluation FOODS FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN FOODS_1 NaN NaN NaN NaN NaN NaN 0.0 NaN NaN NaN NaN NaN NaN NaN WI_2 0 WI NaN 5 4 6 5 1 2 2011

Split dataset into train and test#

df_train = df_features.filter(F.col("date") <= "2016-05-22")
df_future = df_features.filter(F.col("date") > "2016-05-22")

Training#

model = ForecastFlowML(
    
    # dataset parameters
    group_col="state_id",
    id_col="id",
    date_col="date",
    target_col="sales",
    date_frequency="days",

    # model parameters
    model_horizon=7,
    max_forecast_horizon=28,
    model=LGBMRegressor(),
)
trained_models = model.train(df_train).localCheckpoint()
trained_models.limit(3).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:

It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
group forecast_horizon model start_time end_time elapsed_seconds
0 CA [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (03:22:30) 05-Apr-2023 (03:22:42) 11.6
1 TX [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (03:22:32) 05-Apr-2023 (03:22:43) 11.1
2 WI [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (03:22:43) 05-Apr-2023 (03:22:48) 5.2

Backtesting#

cv_forecast = model.cross_validate(df_train, n_cv_splits=3).localCheckpoint()
cv_forecast.limit(5).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:

It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
group id date cv target forecast
0 CA FOODS_1_051_CA_4_evaluation 2016-04-25 0 1.0 1.138023
1 CA FOODS_1_051_CA_4_evaluation 2016-04-26 0 1.0 1.083809
2 CA FOODS_1_051_CA_4_evaluation 2016-04-27 0 0.0 1.046020
3 CA FOODS_1_051_CA_4_evaluation 2016-04-28 0 1.0 1.153849
4 CA FOODS_1_051_CA_4_evaluation 2016-04-29 0 2.0 1.164669
5 CA FOODS_1_051_CA_4_evaluation 2016-04-30 0 1.0 1.419541
6 CA FOODS_1_051_CA_4_evaluation 2016-05-01 0 0.0 1.511467
7 CA FOODS_1_179_CA_2_evaluation 2016-04-25 0 1.0 0.426190
8 CA FOODS_1_179_CA_2_evaluation 2016-04-26 0 0.0 0.419881
9 CA FOODS_1_179_CA_2_evaluation 2016-04-27 0 0.0 0.376968

Plot cross validation forecasts#

cv_state = (
    df_train.select("id", "state_id", "date", "sales")
    .join(
        cv_forecast.select("id", "date", "cv", "forecast"),
        on=["id", "date"],
        how="left",
    )
    .groupBy("id", "state_id", "date", "sales")
    .pivot("cv")
    .sum("forecast")
    .groupBy("state_id", "date")
    .agg(
        F.sum("sales").alias("sales"),
        *[F.sum(f"{i}").alias(f"cv_{i}") for i in range(3)],
    )
    .orderBy("state_id", "date")
).toPandas()
pio.renderers.default = "notebook"
fig = px.line(
    cv_state,
    x="date",
    y=["sales", *[f"cv_{i}" for i in range(3)]],
    facet_row="state_id",
    facet_row_spacing=0.01,
    height=1250,
    width=720,
)
fig.update_layout(legend=dict(
        orientation="h",
        yanchor="top",
        y=1.04,
        xanchor="center",
        x=0.5),
     margin=dict(l=0, r=10, t=10, b=10))
fig.update_yaxes(matches=None, title="")
fig.update_xaxes(type="date", range=["2015-01-01", "2016-05-22"])

Inference#

forecast = model.predict(df_future, trained_models)
forecast.limit(5).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:

It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
id date prediction
0 FOODS_1_051_CA_4_evaluation 2016-05-23 0.979816
1 FOODS_1_051_CA_4_evaluation 2016-05-24 0.890988
2 FOODS_1_051_CA_4_evaluation 2016-05-25 0.794059
3 FOODS_1_051_CA_4_evaluation 2016-05-26 0.929395
4 FOODS_1_051_CA_4_evaluation 2016-05-27 1.031517
past_future = (
    df.select("id", "state_id", "date", "sales")
    .join(forecast, on=["id", "date"], how="left")
    .groupBy("state_id", "date")
    .agg(
        F.sum("sales").alias("sales"),
        F.sum("prediction").alias("prediction"),
    )
    .orderBy("state_id", "date")
    .toPandas()
)
pio.renderers.default = "notebook"
fig = px.line(
    past_future,
    x="date",
    y=["sales", "prediction"],
    facet_row="state_id",
    facet_row_spacing=0.01,
    height=1250,
    width=720,
)
fig.update_layout(legend=dict(
        orientation="h",
        yanchor="top",
        y=1.04,
        xanchor="center",
        x=0.5
    ),
     margin=dict(l=0, r=10, t=10, b=10))
fig.update_yaxes(matches=None, title="")
fig.update_xaxes(type="date", range=["2015-01-01", "2016-06-19"])